package org.zjvis.datascience.service.dataset.quartz;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import javax.sql.DataSource;
import org.apache.commons.lang3.StringUtils;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.zjvis.datascience.common.dto.DatasetDTO;
import org.zjvis.datascience.common.dto.GreenPlumDTO;
import org.zjvis.datascience.common.dto.dataset.DatasetJsonInfo;
import org.zjvis.datascience.common.dto.datax.DataXConfigDTO;
import org.zjvis.datascience.common.util.RedisUtil;
import org.zjvis.datascience.common.util.RestTemplateUtil;
import org.zjvis.datascience.common.util.db.JDBCUtil;
import org.zjvis.datascience.service.DataXService;
import org.zjvis.datascience.service.DatasetActionService;
import org.zjvis.datascience.service.SqlQueryService;
import org.zjvis.datascience.service.dataprovider.GPDataProvider;
import org.zjvis.datascience.service.dataset.DataConfigEncryption;
import org.zjvis.datascience.service.dataset.DatasetService;
import org.zjvis.datascience.service.dataset.ImportDataService;
import org.zjvis.datascience.service.mapper.DatasetMapper;

/**
 * @description DataX的定时任务执行器
 * @date 2021-12-06
 */
@Component
@DisallowConcurrentExecution
public class DataXJob implements Job {

    private final static Logger log = LoggerFactory.getLogger("DataXJob");

    private RedisUtil redisUtil;
    private RestTemplateUtil restTemplateUtil;
    private DatasetMapper datasetMapper;
    private DatasetService datasetService;
    private SqlQueryService sqlQueryService;
    private DatasetActionService datasetActionService;
    private DataXService dataXService;
    private GreenPlumDTO greenPlumDTO;
    private DataXConfigDTO dataXConfigDTO;
    private GPDataProvider gpDataProvider;
    private ImportDataService importDataService;

    private void initComponent() {
        this.redisUtil = (RedisUtil) SpringContextJobUtil.getBean("redisUtil");
        this.restTemplateUtil = (RestTemplateUtil) SpringContextJobUtil.getBean("restTemplateUtil");
        this.datasetMapper = (DatasetMapper) SpringContextJobUtil.getBean("datasetMapper");
        this.datasetService = (DatasetService) SpringContextJobUtil.getBean("datasetService");
        this.sqlQueryService = (SqlQueryService) SpringContextJobUtil.getBean("sqlQueryService");
        this.datasetActionService = (DatasetActionService) SpringContextJobUtil.getBean("datasetActionService");
        this.dataXService = (DataXService) SpringContextJobUtil.getBean("dataXService");
        this.greenPlumDTO = (GreenPlumDTO) SpringContextJobUtil.getBean("greenPlumDTO");
        this.dataXConfigDTO = (DataXConfigDTO) SpringContextJobUtil.getBean("dataXConfigDTO");
        this.importDataService = (ImportDataService) SpringContextJobUtil.getBean("importDataService");
        this.gpDataProvider = (GPDataProvider) SpringContextJobUtil.getBean("gpDataProvider");
    }

    @Override
    public void execute(JobExecutionContext context) {
        initComponent();
        String jobName = context.getJobDetail().getKey().getName();
        String targetTable = context.getJobDetail().getJobDataMap().getString("targetTable");
        if (StringUtils.isBlank(targetTable)) {
           log.error("Scheduling task error, targetTable is null");
           return;
        }
        log.info("Try getting lock...");//通过分布式锁确保在集群情况下只有单个进程执行对一个特定表的导入操作
        Boolean flag = redisUtil.setnx("dataX_" + targetTable, "true", 5 * 60L);
        if (flag != null && flag) {
            try {
                log.info("Starting scheduling tasks，task id: {}, targetTable: {}", jobName, targetTable);
                executeJob(context);
            } catch (Exception e) {
                log.error("job execution error!", e);
            }
        } else {
            log.info("Scheduling task get lock fail, id: {}, targetTable: {}", jobName, targetTable);
        }
//        log.info("Starting scheduling tasks，task id：" + jobName);
//        try {
//            executeJob(context);
//        } catch (Exception e) {
//            log.error("dataX execute error!", e);
//        }
    }

    @Transactional
    public void executeJob(JobExecutionContext context) throws Exception {
        String jobName = context.getJobDetail().getKey().getName();
        long id = Long.parseLong(jobName.split("_")[1]);
        // 获取需要执行的任务的datasetDTO
        DatasetDTO datasetDTO = datasetMapper.queryById(id);
        // 获取dataJson字段
        JSONObject dataJsonObject = JSON.parseObject(datasetDTO.getDataJson());

        // 获取原表信息
        String data_config = datasetDTO.getDataConfig();
        JSONObject dataConfig = null;
        try {
            dataConfig = JSON.parseObject(DataConfigEncryption.decrypt(datasetDTO.getUserId().toString(), data_config));
        } catch (Exception e) {
            log.error("dataX execute error!", e);
        }
        if (dataConfig == null) {
            log.error("executeJob: Can't get datasource config string from db, please check your data config.");
            return;
        }
        String sourceTable = dataConfig.getString("tableName");
        String sourceDbType = dataConfig.getString("databaseType");
        String url = dataConfig.getString(("url"));
        String user = dataConfig.getString("user");
        String password = dataConfig.getString(("password"));
        if (sourceTable == null || sourceDbType == null || url == null || user == null || password == null) {
            log.error("executeJob: Some information missing in datasource config string, please check your data config.");
            return;
        }

        DataSource fromDs = JDBCUtil.getDataSource(url, user, password);

        // gp中表名
        String targetTable = dataJsonObject.getString("table");

        // 获取原表配置信息
        JSONObject incrementalConfigJson = JSON.parseObject(datasetDTO.getIncrementalDataConfig());
        String incrementColumn = incrementalConfigJson.getString("incrementColumn");
        String lastValue = incrementalConfigJson.getString("lastValue");

        DatasetJsonInfo dj = JSONObject.parseObject(datasetDTO.getDataJson(),DatasetJsonInfo.class);

        //如果待导入数据不为空，则向gp导入数据
        int size = JDBCUtil.getDataSize(fromDs, sourceTable, incrementColumn, lastValue);
        if(size > 0) {
            importDataService.executeImport(sourceTable, fromDs, targetTable, size, datasetDTO.getUserId(), sourceDbType, dj, false, lastValue, incrementColumn);
            // 更新数据库的lastValue
            incrementalConfigJson.put("lastValue", datasetService.getMaxValue(targetTable, incrementColumn));
            DatasetDTO newDatasetDTO = DatasetDTO.builder().id(id)
                    .incrementalDataConfig(incrementalConfigJson.toString()).build();
            datasetMapper.updateIncrementalConfig(newDatasetDTO);
            //数据变更记录
            datasetActionService.insertAction(Long.valueOf(size), dataJsonObject, datasetMapper.queryById(id));
        }
    }
}
